// SPDX-License-Identifier: GPL-3.0-or-later

#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"

// Read from the config file -- new section [agent_cloud_link]
// Defaults are supplied

int aclk_port = ACLK_DEFAULT_PORT;
char *aclk_hostname = ACLK_DEFAULT_HOST;
int aclk_subscribed = 0;
int aclk_disable_single_updates = 0;

int aclk_metadata_submitted = 0;
int agent_state = 0;
time_t last_init_sequence = 0;
int waiting_init = 1;

char *global_base_topic = NULL;
int aclk_connecting = 0;

char *create_uuid()
{
    uuid_t uuid;
    char *uuid_str = mallocz(36 + 1);

    uuid_generate(uuid);
    uuid_unparse(uuid, uuid_str);

    return uuid_str;
}

int cloud_to_agent_parse(JSON_ENTRY *e)
{
    struct aclk_request *data = e->callback_data;

    switch(e->type) {
        case JSON_OBJECT:
        case JSON_ARRAY:
                break;
        case JSON_STRING:
            if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
                data->msg_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
                data->type_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
                data->callback_topic = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
                data->payload = strdupz(e->data.string);
                break;
            }
            break;
        case JSON_NUMBER:
            if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
                data->version = atoi(e->original_string);
                break;
            }
            break;

        case JSON_BOOLEAN:
            break;

        case JSON_NULL:
            break;
    }
    return 0;
}


// Set when we have connection up and running from the connection callback
int aclk_connection_initialized = 0;
// TODO modify previous comment if this stays this way
// con_initialized means library is initialized and ready to be used
// acklk_connected means there is actually an established connection
int aclk_mqtt_connected = 0;

static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;

#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)

#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)

#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)

pthread_cond_t  query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;

#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)


/*
 * Maintain a list of collectors and chart count
 * If all the charts of a collector are deleted
 * then a new metadata dataset must be send to the cloud
 *
 */
struct _collector {
    time_t created;
    u_int32_t count;       //chart count
    u_int32_t hostname_hash;
    u_int32_t plugin_hash;
    u_int32_t module_hash;
    char *hostname;
    char *plugin_name;
    char *module_name;
    struct _collector *next;
};

struct _collector *collector_list = NULL;

struct aclk_query {
    time_t created;
    time_t run_after; // Delay run until after this time
    ACLK_CMD cmd;     // What command is this
    char *topic;      // Topic to respond to
    char *data;       // Internal data (NULL if request from the cloud)
    char *msg_id;     // msg_id generated by the cloud (NULL if internal)
    char *query;      // The actual query
    u_char deleted;   // Mark deleted for garbage collect
    struct aclk_query *next;
};

struct aclk_query_queue {
    struct aclk_query *aclk_query_head;
    struct aclk_query *aclk_query_tail;
    u_int64_t count;
} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };

/*
 * After a connection failure -- delay in milliseconds
 * When a connection is established, the delay function
 * should be called with
 *
 * mode 0 to reset the delay
 * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
 *
 */
unsigned long int aclk_reconnect_delay(int mode)
{
    static  int fail = -1;
    unsigned long int delay;

    if (!mode || fail == -1) {
        srandom(time(NULL));
        fail = mode-1;
        return 0;
    }

    delay = (1 << fail);

    if (delay >= ACLK_MAX_BACKOFF_DELAY) {
        delay = ACLK_MAX_BACKOFF_DELAY * 1000;
    }
    else {
        fail++;
        delay = (delay * 1000) + (random() % 1000);
    }

//    sleep_usec(USEC_PER_MS * delay);

    return delay;
}

/*
 * Free a query structure when done
 */

void aclk_query_free(struct aclk_query *this_query)
{
    if (unlikely(!this_query))
        return;

    freez(this_query->topic);
    if (likely(this_query->query))
        freez(this_query->query);
    if (likely(this_query->data))
        freez(this_query->data);
    if (likely(this_query->msg_id))
        freez(this_query->msg_id);
    freez(this_query);
}

// Returns the entry after which we need to create a new entry to run at the specified time
// If NULL is returned we need to add to HEAD
// Need to have a QUERY lock before calling this

struct aclk_query *aclk_query_find_position(time_t time_to_run)
{
    struct aclk_query *tmp_query, *last_query;

    // Quick check if we will add to the end
    if (likely(aclk_queue.aclk_query_tail)) {
        if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
            return aclk_queue.aclk_query_tail;
    }

    last_query = NULL;
    tmp_query = aclk_queue.aclk_query_head;

    while (tmp_query) {
        if (tmp_query->run_after > time_to_run)
            return last_query;
        last_query = tmp_query;
        tmp_query = tmp_query->next;
    }
    return last_query;
}

// Need to have a QUERY lock before calling this
struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
{
    struct aclk_query *tmp_query, *prev_query;
    UNUSED(cmd);

    tmp_query = aclk_queue.aclk_query_head;
    prev_query = NULL;
    while (tmp_query) {
        if (likely(!tmp_query->deleted)) {
            if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
                if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
                    (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {

                    if (likely(last_query))
                        *last_query = prev_query;
                    return tmp_query;
                }
            }
        }
        prev_query = tmp_query;
        tmp_query = tmp_query->next;
    }
    return NULL;
}

/*
 * Add a query to execute, the result will be send to the specified topic
 */

int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
    struct aclk_query *new_query, *tmp_query;

    // Ignore all commands while we wait for the agent to initialize
    if (unlikely(waiting_init))
        return 0;

    // Ignore all commands if agent not stable and reset the last_init_sequence mark
    if (agent_state == 0) {
        last_init_sequence = now_realtime_sec();
        return 0;
    }

    run_after = now_realtime_sec() + run_after;

    QUERY_LOCK;
    struct aclk_query *last_query = NULL;

    //last_query = NULL;
    tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
    if (unlikely(tmp_query)) {
        if (tmp_query->run_after == run_after) {
            QUERY_UNLOCK;
            QUERY_THREAD_WAKEUP;
            return 0;
        }

        if (last_query)
            last_query->next = tmp_query->next;
        else
            aclk_queue.aclk_query_head = tmp_query->next;

        debug(D_ACLK, "Removing double entry");
        aclk_query_free(tmp_query);
        aclk_queue.count--;
    }

    new_query = callocz(1, sizeof(struct aclk_query));
    new_query->cmd = aclk_cmd;
    if (internal) {
        new_query->topic = strdupz(topic);
        if (likely(query))
            new_query->query = strdupz(query);
    } else {
        new_query->topic = topic;
        new_query->query = query;
        new_query->msg_id = msg_id;
    }

    if (data)
        new_query->data = strdupz(data);

    new_query->next = NULL;
    new_query->created = now_realtime_sec();
    new_query->run_after = run_after;

    debug(D_ACLK, "Added query (%s) (%s)", topic, query?query:"");

    tmp_query = aclk_query_find_position(run_after);

    if (tmp_query) {
        new_query->next = tmp_query->next;
        tmp_query->next = new_query;
        if (tmp_query == aclk_queue.aclk_query_tail)
            aclk_queue.aclk_query_tail = new_query;
        aclk_queue.count++;
        QUERY_UNLOCK;
        QUERY_THREAD_WAKEUP;
        return 0;
    }

    new_query->next = aclk_queue.aclk_query_head;
    aclk_queue.aclk_query_head = new_query;
    aclk_queue.count++;

    QUERY_UNLOCK;
    QUERY_THREAD_WAKEUP;
    return 0;
}

inline int aclk_submit_request(struct aclk_request *request)
{
    return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
}

/*
 * Get the next query to process - NULL if nothing there
 * The caller needs to free memory by calling aclk_query_free()
 *
 *      topic
 *      query
 *      The structure itself
 *
 */
struct aclk_query *aclk_queue_pop()
{
    struct aclk_query *this_query;

    QUERY_LOCK;

    if (likely(!aclk_queue.aclk_query_head)) {
        QUERY_UNLOCK;
        return NULL;
    }

    this_query = aclk_queue.aclk_query_head;

    // Get rid of the deleted entries
    while (this_query && this_query->deleted) {
        aclk_queue.count--;

        aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;

        if (likely(!aclk_queue.aclk_query_head)) {
            aclk_queue.aclk_query_tail = NULL;
        }

        aclk_query_free(this_query);

        this_query = aclk_queue.aclk_query_head;
    }

    if (likely(!this_query)) {
        QUERY_UNLOCK;
        return NULL;
    }

    if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
        info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
        QUERY_UNLOCK;
        return NULL;
    }

    aclk_queue.count--;
    aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;

    if (likely(!aclk_queue.aclk_query_head)) {
        aclk_queue.aclk_query_tail = NULL;
    }

    QUERY_UNLOCK;
    return this_query;
}

// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g.  base_topic/subtopic
// This is called by aclk_init(), to compute the base topic once and have
// it stored internally.
// Need to check if additional logic should be added to make sure that there
// is enough information to determine the base topic at init time


char *create_publish_base_topic()
{
    if (unlikely(!is_agent_claimed()))
        return NULL;

    ACLK_LOCK;

    if (unlikely(!global_base_topic)) {
        char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;

        snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
        tmp = strchr(tmp_topic, '\n');
        if (unlikely(tmp))
            *tmp = '\0';
        global_base_topic = strdupz(tmp_topic);
    }

    ACLK_UNLOCK;
    return global_base_topic;
}

/*
 * Build a topic based on sub_topic and final_topic
 * if the sub topic starts with / assume that is an absolute topic
 *
 */

char *get_topic(char *sub_topic, char *final_topic, int max_size)
{
    int rc;

    if (likely(sub_topic && sub_topic[0] == '/'))
        return sub_topic;

    if (unlikely(!global_base_topic))
        return sub_topic;

    rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
    if (unlikely(rc >= max_size))
        debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);

    return final_topic;
}


/*
 * Free a collector structure
 */

static void _free_collector(struct _collector *collector)
{

    if (likely(collector->plugin_name))
        freez(collector->plugin_name);

    if (likely(collector->module_name))
        freez(collector->module_name);

    if (likely(collector->hostname))
        freez(collector->hostname);

    freez(collector);
}

/*
 * This will report the collector list
 *
 */
#ifdef ACLK_DEBUG
static void _dump_connector_list()
{

    struct _collector  *tmp_collector;

    COLLECTOR_LOCK;

    info("DUMPING ALL COLLECTORS");

    if (unlikely(!collector_list || !collector_list->next)) {
        COLLECTOR_UNLOCK;
        info("DUMPING ALL COLLECTORS -- nothing found");
        return;
    }

    // Note that the first entry is "dummy"
    tmp_collector = collector_list->next;

    while (tmp_collector) {
        info(
            "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
            tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
            tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);

        tmp_collector = tmp_collector->next;

    }
    info("DUMPING ALL COLLECTORS DONE");
    COLLECTOR_UNLOCK;
}
#endif

/*
 * This will cleanup the collector list
 *
 */
static void _reset_connector_list()
{
    struct _collector  *tmp_collector, *next_collector;

    COLLECTOR_LOCK;

    if (unlikely(!collector_list || !collector_list->next)) {
        COLLECTOR_UNLOCK;
        return;
    }

    // Note that the first entry is "dummy"
    tmp_collector  = collector_list->next;
    collector_list->count = 0;
    collector_list->next  = NULL;

    // We broke the link; we can unlock
    COLLECTOR_UNLOCK;

    while (tmp_collector) {
        next_collector = tmp_collector->next;
        _free_collector(tmp_collector);
        tmp_collector = next_collector;
    }
}


/*
 * Find a collector (if it exists)
 * Must lock before calling this
 * If last_collector is not null, it will return the previous collector in the linked
 * list (used in collector delete)
 */
static struct _collector *_find_collector(const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
{
    struct _collector *tmp_collector, *prev_collector;
    uint32_t plugin_hash;
    uint32_t module_hash;
    uint32_t hostname_hash;

    if (unlikely(!collector_list)) {
        collector_list = callocz(1, sizeof(struct _collector));
        return NULL;
    }

    if (unlikely(!collector_list->next))
        return NULL;

    plugin_hash = plugin_name?simple_hash(plugin_name):1;
    module_hash = module_name?simple_hash(module_name):1;
    hostname_hash = simple_hash(hostname);

    // Note that the first entry is "dummy"
    tmp_collector  = collector_list->next;
    prev_collector = collector_list;
    while (tmp_collector) {
        if (plugin_hash == tmp_collector->plugin_hash &&
            module_hash == tmp_collector->module_hash &&
            hostname_hash == tmp_collector->hostname_hash &&
            (!strcmp(hostname, tmp_collector->hostname)) &&
            (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
            (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {

            if (unlikely(last_collector))
                *last_collector = prev_collector;

            return tmp_collector;
        }

        prev_collector = tmp_collector;
        tmp_collector = tmp_collector->next;
    }

    return tmp_collector;
}

/*
 * Called to delete a collector
 * It will reduce the count (chart_count) and will remove it
 * from the linked list if the count reaches zero
 * The structure will be returned to the caller to free
 * the resources
 *
 */
static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector  *tmp_collector, *prev_collector = NULL;

    tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);

    if (likely(tmp_collector)) {
        --tmp_collector->count;
        if (unlikely(!tmp_collector->count))
            prev_collector->next = tmp_collector->next;
    }
    return tmp_collector;
}


/*
 * Add a new collector (plugin / module) to the list
 * If it already exists just update the chart count
 *
 * Lock before calling
 */
static struct _collector  *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector  *tmp_collector;

    tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);

    if (unlikely(!tmp_collector)) {

        tmp_collector = callocz(1, sizeof(struct _collector));
        tmp_collector->hostname_hash = simple_hash(hostname);
        tmp_collector->plugin_hash = plugin_name?simple_hash(plugin_name):1;
        tmp_collector->module_hash = module_name?simple_hash(module_name):1;

        tmp_collector->hostname = strdupz(hostname);
        tmp_collector->plugin_name = plugin_name?strdupz(plugin_name):NULL;
        tmp_collector->module_name = module_name?strdupz(module_name):NULL;

        tmp_collector->next = collector_list->next;
        collector_list->next = tmp_collector;
    }
    tmp_collector->count++;
    debug(D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name?plugin_name:"*", module_name?module_name:"*", tmp_collector->count);
    return tmp_collector;
}

/*
 * Add a new collector to the list
 * If it exists, update the chart count
 */
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector  *tmp_collector;

    COLLECTOR_LOCK;

    tmp_collector = _add_collector(hostname, plugin_name, module_name);

    if (unlikely(tmp_collector->count != 1)) {
        COLLECTOR_UNLOCK;
        return;
    }

    aclk_queue_query("connector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);

    COLLECTOR_UNLOCK;
}

/*
 * Delete a collector from the list
 * If the chart count reaches zero the collector will be removed
 * from the list by calling del_collector.
 *
 * This function will release the memory used and schedule
 * a cloud update
 */
void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector  *tmp_collector;

    COLLECTOR_LOCK;

    tmp_collector = _del_collector(hostname, plugin_name, module_name);

    if (unlikely(!tmp_collector || tmp_collector->count)) {
        COLLECTOR_UNLOCK;
        return;
    }

    debug(D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name?plugin_name:"*", module_name?module_name:"*", tmp_collector->count);

    COLLECTOR_UNLOCK;

    aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);

    _free_collector(tmp_collector);
}


// Wait for ACLK connection to be established
int aclk_wait_for_initialization()
{
    if (unlikely(!aclk_connection_initialized)) {
        time_t now = now_realtime_sec();

        while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
            sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
            _link_event_loop(0);

            if (unlikely(!netdata_exit))
                return 1;
        }

        if (unlikely(!aclk_connection_initialized)) {
            error("ACLK connection cannot be established");
            return 1;
        }
    }
    return 0;
}

int aclk_execute_query(struct aclk_query *this_query)
{
    if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
        struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
        w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
        strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
        w->cookie1[0] = 0;      // Simulate web_client_create_on_fd()
        w->cookie2[0] = 0;      // Simulate web_client_create_on_fd()
        w->acl = 0x1f;

        char *mysep = strchr(this_query->query, '?');
        if (mysep) {
            strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
            *mysep = '\0';
        } else
            strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);

        mysep = strrchr(this_query->query, '/');

        // TODO: handle bad response perhaps in a different way. For now it does to the payload
        int rc = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
        BUFFER  *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
        buffer_flush(local_buffer);
        local_buffer->contenttype = CT_APPLICATION_JSON;

        aclk_create_header(local_buffer, "http", this_query->msg_id);

        if (rc != HTTP_RESP_OK || strcmp(mysep?mysep+1:"noop", "badge.svg") == 0)
            buffer_sprintf(local_buffer, "\"%s\"", aclk_encode_response(w->response.data)->buffer);
        else
            buffer_sprintf(local_buffer, "%s", aclk_encode_response(w->response.data)->buffer);

        buffer_sprintf(local_buffer,"\n}");

        aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);

        buffer_free(w->response.data);
        freez(w);
        buffer_free(local_buffer);
        return 0;
    }
    return 1;
}

/*
 * This function will fetch the next pending command and process it
 *
 */
int aclk_process_query()
{
    struct aclk_query *this_query;
    static long int query_count = 0;

    if (!aclk_connection_initialized)
        return 0;

    this_query = aclk_queue_pop();
    if (likely(!this_query)) {
        return 0;
    }

    if (unlikely(this_query->deleted)) {
        debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
        aclk_query_free(this_query);
        return 1;
    }
    query_count++;

    debug(
        D_ACLK, "Query #%ld (%s) size=%ld in queue %d seconds", query_count, this_query->topic, this_query->query?strlen(this_query->query):0,
        (int)(now_realtime_sec() - this_query->created));

    switch (this_query->cmd) {

        case ACLK_CMD_ONCONNECT:
            debug(D_ACLK, "EXECUTING on connect metadata command");
            aclk_send_metadata();
            aclk_metadata_submitted = 2;
            break;

        case ACLK_CMD_CHART:
            debug(D_ACLK, "EXECUTING a chart update command");
            aclk_send_single_chart(this_query->data, this_query->query);
            break;

        case ACLK_CMD_CHARTDEL:
            debug(D_ACLK, "EXECUTING a chart delete command");
            //TODO: This send the info metadata for now
            aclk_send_info_metadata();
            break;

        case ACLK_CMD_ALARM:
            debug(D_ACLK, "EXECUTING an alarm update command");
            aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
            break;

        case ACLK_CMD_ALARMS:
            debug(D_ACLK, "EXECUTING an alarms update command");
            aclk_send_alarm_metadata();
            break;

        case ACLK_CMD_CLOUD:
            debug(D_ACLK, "EXECUTING a cloud command");
            aclk_execute_query(this_query);
            break;

        default:
            break;
    }
    debug(
        D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);

    aclk_query_free(this_query);

    return 1;
}

/*
 * Process all pending queries
 * Return 0 if no queries were processed, 1 otherwise
 *
 */

int aclk_process_queries()
{
    if (unlikely(netdata_exit || !aclk_connection_initialized))
        return 0;

    if (likely(!aclk_queue.count))
        return 0;

    debug(D_ACLK, "Processing %d queries", (int ) aclk_queue.count);

    //TODO: may consider possible throttling here
    while (aclk_process_query()) {
        // Process all commands
    };

    return 1;
}

static void aclk_query_thread_cleanup(void *ptr)
{
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    info("cleaning up...");

    COLLECTOR_LOCK;

    _reset_connector_list();
    freez(collector_list);

    COLLECTOR_UNLOCK;

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

/**
 * Main query processing thread
 *
 * On startup wait for the agent collectors to initialize
 * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
 * of no new collectors coming in in order to mark the agent
 * as stable (set agent_state = 1)
 */
void *aclk_query_main_thread(void *ptr)
{
    netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);

    while (!agent_state && !netdata_exit) {
        time_t  checkpoint;

        checkpoint =  now_realtime_sec() - last_init_sequence;
        info("Waiting for agent collectors to initialize");
        sleep_usec(USEC_PER_SEC * ACLK_STABLE_TIMEOUT);
        if (checkpoint > ACLK_STABLE_TIMEOUT) {
            agent_state = 1;
            info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
#ifdef ACLK_DEBUG
            _dump_connector_list();
#endif
        }
    }

    while (!netdata_exit) {

        if (unlikely(!aclk_metadata_submitted)) {
            aclk_metadata_submitted = 1;
            aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
        }

        aclk_process_queries();

        QUERY_THREAD_LOCK;

        // TODO: Need to check if there are queries awaiting already
        if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
            sleep_usec(USEC_PER_SEC * 1);

        QUERY_THREAD_UNLOCK;

    } // forever
    info("Shutting down query processing thread");
    netdata_thread_cleanup_pop(1);
    return NULL;
}

// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    info("cleaning up...");

    // Wakeup thread to cleanup
    QUERY_THREAD_WAKEUP;

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

/**
 * Main agent cloud link thread
 *
 * This thread will simply call the main event loop that handles
 * pending requests - both inbound and outbound
 *
 * @param ptr is a pointer to the netdata_static_thread structure.
 *
 * @return It always returns NULL
 */
void *aclk_main(void *ptr)
{
    struct netdata_static_thread *query_thread;

    netdata_thread_cleanup_push(aclk_main_cleanup, ptr);

    info("Waiting for netdata to be ready");
    while (!netdata_ready) {
        sleep_usec(USEC_PER_MS * 300);
    }

    last_init_sequence = now_realtime_sec();
    query_thread = NULL;

    aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", ACLK_DEFAULT_HOST);
    aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", ACLK_DEFAULT_PORT);


     // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
     // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
    while(likely(!is_agent_claimed())) {
        sleep_usec(USEC_PER_SEC * 5);
        if(netdata_exit)
            goto exited;
    }
    create_publish_base_topic();

    usec_t reconnect_expiry = 0; // In usecs

    while (!netdata_exit) {
        static int first_init = 0;
        _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
        debug(D_ACLK,"LINK event loop called");

        if (unlikely(!aclk_connection_initialized)) {
            if (unlikely(first_init)) {
                aclk_try_to_connect();
                first_init = 1;
            } else {
                if (aclk_connecting == 0) {
                    if (reconnect_expiry == 0) {
                        unsigned long int delay = aclk_reconnect_delay(1);
                        reconnect_expiry = now_realtime_usec() + delay * 1000;
                        info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
                    }
                    if (now_realtime_usec() >= reconnect_expiry) {
                        reconnect_expiry = 0;
                        aclk_connecting = 1;
                        aclk_try_to_connect();
                    }
                    sleep_usec(USEC_PER_MS * 100);
                }
            }
            continue;
        }

        if (likely(aclk_mqtt_connected)) {

            if (unlikely(!aclk_subscribed)) {
                aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
            }

            if (unlikely(!query_thread)) {
                query_thread = callocz(1, sizeof(struct netdata_static_thread));
                query_thread->thread = mallocz(sizeof(netdata_thread_t));
                netdata_thread_create(
                    query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
                    query_thread);
            }
        }
    } // forever
exited:
    aclk_shutdown();

    netdata_thread_cleanup_pop(1);
    return NULL;
}

/*
 * Send a message to the cloud, using a base topic and sib_topic
 * The final topic will be in the form <base_topic>/<sub_topic>
 * If base_topic is missing then the global_base_topic will be used (if available)
 *
 */
int aclk_send_message(char *sub_topic, char *message, char *msg_id)
{
    int rc;
    int mid;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    UNUSED(msg_id);

    if (unlikely(aclk_wait_for_initialization()))
        return 1;

    if (unlikely(!message))
        return 0;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);

    if (unlikely(!final_topic)) {
        errno = 0;
        error("Unable to build outgoing topic; truncated?");
        return 1;
    }

    ACLK_LOCK;
    rc = _link_send_message(final_topic, message, &mid);
    // TODO: link the msg_id with the mid so we can trace it
    ACLK_UNLOCK;


    if (unlikely(rc)) {
        errno = 0;
        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
    }

    return rc;
}

/*
 * Subscribe to a topic in the cloud
 * The final subscription will be in the form
 * /agent/claim_id/<sub_topic>
 */
int aclk_subscribe(char *sub_topic, int qos)
{
    int rc;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    if (unlikely(aclk_wait_for_initialization()))
        return 1;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
    if (unlikely(!final_topic)) {
        errno = 0;
        error("Unable to build outgoing topic; truncated?");
        return 1;
    }

    ACLK_LOCK;
    rc = _link_subscribe(final_topic, qos);
    ACLK_UNLOCK;

    // TODO: Add better handling -- error will flood the logfile here
    if (unlikely(rc)) {
        errno = 0;
        error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
    }

    return rc;
}


// This is called from a callback when the link goes up
void aclk_connect(void *ptr)
{
    UNUSED(ptr);
    info("Connection detected");
    aclk_connection_initialized = 1;
    waiting_init = 0;
    aclk_reconnect_delay(0);
    QUERY_THREAD_WAKEUP;
    return;
}

// This is called from a callback when the link goes down
void aclk_disconnect(void *ptr)
{
    UNUSED(ptr);

    if (likely(aclk_connection_initialized))
        info("Disconnect detected");
    aclk_subscribed = 0;
    aclk_metadata_submitted = 0;
    waiting_init = 1;
    aclk_connection_initialized = 0;
    aclk_connecting = 0;
}

void aclk_shutdown()
{
    info("Shutdown initiated");
    aclk_connection_initialized = 0;
    _link_shutdown();
    info("Shutdown complete");
}

void aclk_try_to_connect()
{
    int rc;
    rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
    if (unlikely(rc)) {
        error("Failed to initialize the agent cloud link library");
    }
}


inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
{
    uuid_t uuid;
    time_t time_created;
    char uuid_str[36 + 1];

    if (unlikely(!msg_id)) {
        uuid_generate(uuid);
        uuid_unparse(uuid, uuid_str);
        msg_id = uuid_str;
    }

    time_created = now_realtime_sec();

    buffer_sprintf(
        dest,
        "\t{\"type\": \"%s\",\n"
        "\t\"msg-id\": \"%s\",\n"
        "\t\"timestamp\": %ld,\n"
        "\t\"version\": %d,\n"
        "\t\"payload\": ",
        type, msg_id, time_created, ACLK_VERSION);

    debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
}

//#define EYE_FRIENDLY

/*
 * Take a buffer, encode it and rewrite it
 *
 */

BUFFER *aclk_encode_response(BUFFER *contents)
{
#ifdef EYE_FRIENDLY

    return contents;
#else
    char *tmp_buffer = mallocz(contents->len * 2);
    char *src, *dst;

    src = contents->buffer;
    dst = tmp_buffer;
    while (*src) {
        switch (*src) {
            case '\n':
                *dst++ = '\\';
                *dst++ = 'n';
                break;
            case 0x01 ... 0x09:
            case 0x0b ... 0x1F:
                *dst++ = '\\';
                *dst++ = '0';
                *dst++ = '0';
                *dst++ = (*src < 0x0F) ? '0' : '1';
                *dst++ = to_hex(*src);
                break;
            case '\"':
            case '\'':
                *dst++ = '\\';
                *dst++ = *src;
                break;
            default:
                *dst++ = *src;
        }
        src++;
    }
    *dst = '\0';

    buffer_flush(contents);
    buffer_sprintf(contents, "%s", tmp_buffer);

    freez(tmp_buffer);
    return contents;
#endif
}

/*
 * This will send the alarms configuration
 * and
 */
void aclk_send_alarm_metadata()
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    debug(D_ACLK,"Metadata alarms start");

    aclk_create_header(local_buffer, "connect_alarms", msg_id);

    buffer_sprintf(local_buffer,"{\n\t \"configured-alarms\" : ");
    health_alarms2json(localhost, local_buffer, 1);
    debug(D_ACLK,"Metadata %s with configured alarms has %ld bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer,",\n\t \"alarm-log\" : ");
    health_alarm_log2json(localhost, local_buffer, 0);
    debug(D_ACLK,"Metadata %s with alarm_log has %ld bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer,",\n\t \"alarms-active\" : ");
    health_alarms_values2json(localhost, local_buffer, 0);
    debug(D_ACLK,"Metadata %s with alarms_active has %ld bytes", msg_id, local_buffer->len);


    buffer_sprintf(local_buffer,"\n}\n}");
    aclk_send_message(ACLK_ALARMS_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
    debug(D_ACLK,"Metadata %s encoded has %ld bytes", msg_id, local_buffer->len);

    freez(msg_id);
    buffer_free(local_buffer);
}

int aclk_send_info_metadata()
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    debug(D_ACLK,"Metadata /info start");

    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    aclk_create_header(local_buffer, "connect", msg_id);
    buffer_sprintf(local_buffer,"{\n\t \"info\" : ");
    web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
    debug(D_ACLK,"Metadata %s with info has %ld bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer,", \n\t \"charts\" : ");
    charts2json(localhost, local_buffer, 1);
    buffer_sprintf(local_buffer,"\n}\n}");
    debug(D_ACLK,"Metadata %s with chart has %ld bytes", msg_id, local_buffer->len);

    aclk_send_message(ACLK_METADATA_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
    debug(D_ACLK,"Metadata %s encoded has %ld bytes", msg_id, local_buffer->len);
    freez(msg_id);

    buffer_free(local_buffer);
    return 0;
}

// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata()
{

    aclk_send_info_metadata();
    aclk_send_alarm_metadata();

    return 0;
}

void aclk_single_update_disable()
{
    aclk_disable_single_updates = 1;
}

void aclk_single_update_enable()
{
    aclk_disable_single_updates = 0;
}

// Trigged by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
    if (unlikely(!agent_state))
        return;

    aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)

int aclk_send_single_chart(char *hostname, char *chart)
{
    RRDHOST *target_host;

    target_host = rrdhost_find_by_hostname(hostname, 0);
    if (!target_host)
        return 1;

    RRDSET *st = rrdset_find(target_host, chart);
    if (!st)
        st = rrdset_find_byname(target_host, chart);
    if (!st) {
        info("FAILED to find chart %s", chart);
        return 1;
    }

    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    aclk_create_header(local_buffer, "chart", msg_id);
    rrdset2json(st, local_buffer, NULL, NULL, 1);
    buffer_sprintf(local_buffer,"\t\n}");

    aclk_send_message(ACLK_CHART_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
    return 0;
}

int    aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
{
#ifndef ENABLE_ACLK
    UNUSED(host);
    UNUSED(chart_name);
    return 0;
#else
    if (host != localhost)
        return 0;

    if (unlikely(aclk_disable_single_updates))
        return 0;

    aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd);
    return 0;
#endif
}


int    aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
    BUFFER *local_buffer = NULL;

    if (host != localhost)
        return 0;

    if (agent_state == 0)
        return 0;

    /*
     * Check if individual updates have been disabled
     * This will be the case when we do health reload
     * and all the alarms will be dropped and recreated.
     * At the end of the health reload the complete alarm metadata
     * info will be sent
     */
    if (unlikely(aclk_disable_single_updates))
        return 0;

    local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *msg_id = create_uuid();

    buffer_flush(local_buffer);
    aclk_create_header(local_buffer, "alarms", msg_id);

    netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
    health_alarm_entry2json_nolock(local_buffer, ae, host);
    netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);

    buffer_sprintf(local_buffer,"\n}");
    aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, aclk_encode_response(local_buffer)->buffer , 0, 1, ACLK_CMD_ALARM);

    freez(msg_id);
    buffer_free(local_buffer);

    return 0;
}

/*
 * Parse the incoming payload and queue a command if valid
 */
int aclk_handle_cloud_request(char *payload)
{
    struct aclk_request cloud_to_agent = { .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0};

    if (unlikely(!payload)) {
        debug(D_ACLK, "ACLK incoming message is empty");
        return 0;
    }

    debug(D_ACLK, "ACLK incoming message [%s]", payload);

    int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);

    if (unlikely(
            JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
            !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
            strcmp(cloud_to_agent.type_id, "http"))) {

        if (JSON_OK != rc)
            error("Malformed json request (%s)", payload);

        if (cloud_to_agent.version > ACLK_VERSION)
            error("Unsupported version in JSON request %d", cloud_to_agent.version);

        if (cloud_to_agent.payload)
            freez(cloud_to_agent.payload);

        if (cloud_to_agent.type_id)
            freez(cloud_to_agent.type_id);

        if (cloud_to_agent.msg_id)
            freez(cloud_to_agent.msg_id);

        if (cloud_to_agent.callback_topic)
            freez(cloud_to_agent.callback_topic);

        return 1;
    }

    aclk_submit_request(&cloud_to_agent);

    // Note: the payload comes from the callback and it will be automatically freed
    return 0;
}
